package bigdata.jobclean

import bigdata.jobclean.parser.qcParser
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, _}

class JobCleaner {

}

/*
清理工作数据主类
 */
object JobCleaner {

  //encoder
  val jobEncoder: Encoder[Job] = Encoders.product[Job]

  def main(args: Array[String]): Unit = {
    var csv_path = "in/jobs_0406.csv"
    var out_path = "result/cleaned/"
    //招聘人数大于这个数的菜肴
    var city_min = 1000
    if(args.length>1)
      {

        //设置接受命令行参数，第一个输入数据地址,第二个输出目录名
       csv_path = args(0)
       out_path = args(1)
        city_min = args(2).toInt
      }
    println("-----------工作清洗------")
    println(s"输入路径:${csv_path}\n输出路径:${out_path}\n城市最少的招聘数量:${city_min}")
    println(s"csv: ${csv_path} out:${out_path}")

//
    val spark: SparkSession = SparkSession.builder()
      .master("local[2]").appName("jobClean").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")


//
    val parser = new qcParser()

    //广播变量
    val bparser = spark.sparkContext.broadcast(parser)

//    读取原始数据
    val rawdata = spark.read.
      option("header", "true").option("encoding","utf-8").csv(csv_path)

    //      原始数据长度
//    var rawcount = rawdata.count()

    // 转换为job的dataset
    var jobs = rawdata.map(row => {
      bparser.value.parseJob(row)
    })(jobEncoder)


    //      计算各个错误值的个数
    val salaryCount = spark.sparkContext.longAccumulator
    val eduCount = spark.sparkContext.longAccumulator
    val expCount = spark.sparkContext.longAccumulator
    val cityCount = spark.sparkContext.longAccumulator
    //招聘数量太少的城市
    val invalidCity = spark.sparkContext.longAccumulator
    val invalidNum = spark.sparkContext.longAccumulator


    //    统计空的个数
    jobs.foreach(job => {
      if (job.salary == -1) salaryCount.add(1)
      if (job.edu == "NULL") eduCount.add(1)
      if (job.exp == "NULL") expCount.add(1)
      if (job.city == "NULL") cityCount.add(1)
    })

    //      过滤错误数据。那些错位的数据在这里会被过滤掉
    jobs = jobs.filter(job => {
      job.salary != -1 && job.exp != "NULL" && job.edu != "NULL" && job.pubtime != "NULL" &&
        job.city != "NULL" && job.city != "NULL" && job.num != -1 && job.ctype != "NULL" && job.cnum != "未知规模"
      //      未知规模的去掉
    })

    //      处理省，将城市转换为对应的省份，根据对应表
    jobs = jobs.map(job => {
      val city = job.city
      val province = parser.getProvince(city)
      job.province = province
      job
    })(jobEncoder)

    //    除了url都相同的去掉，有很多全部字段一样就除了url不一样，看着重复好多
    jobs = jobs.dropDuplicates(Array("name", "salary", "province", "city", "exp", "edu", "num", "pubtime", "cname", "ctype", "ctrade", "cnum", "cate1", "cate2", "welfare", "detail"))


//---------如果城市的招聘数量少于1000，就过滤掉-------------
    val window = Window.partitionBy("city")
    //    一些城市招聘的数据 < 1000 的
    val result = jobs.withColumn("ccount", count("url").over(window))
      .filter(row => {
        if (row.getAs[Long]("ccount") <= city_min) {
          invalidCity.add(1)
          false
        }
        else {
          true
        }
      })
    //删除临时加的列
    jobs = result.drop("ccount").as[Job](jobEncoder)
//------------------------------------------------------------

//    过滤掉招聘人数大于等于10得
    jobs = jobs.filter(job=>{
      if(job.num<10){
        true
      } else
        {
          invalidNum.add(1)
          false
        }
    })

    jobs.repartition(5).write.option("header", "false").mode(SaveMode.Overwrite).csv(out_path)

    println("=================")
//    println("原始数据长度:" + rawcount)
    println("工资错误:" + salaryCount.value)
    println("学历错误:" + eduCount.value)
    println("经验错误:" + expCount.value)
    println("城市错误:" + cityCount.value)
    println(s"城市招聘数量小于${city_min}的:" + invalidCity.value)
    println("招聘人数大于10得:" + invalidNum.value)

    //----------- 保存数据---
    print("-------------------")

  }

  //  打印统计信息
  def printJob(jobs: Dataset[Job]) = {

    //      统计结果
    val eduGroup = jobs.groupBy("edu").count().collect()
    val expGroup = jobs.groupBy("exp").count().collect()
    val cityGroup = jobs.groupBy("city").count().collect()
    val cnumberGroup = jobs.groupBy("cnum").count().collect()
    val provinceGroup = jobs.groupBy("province").count().collect()
    println(cnumberGroup.length + " cnum:" + cnumberGroup.mkString(","))
    println(eduGroup.length + " edu:" + eduGroup.mkString(","))
    println(expGroup.length + "exp:" + expGroup.mkString(","))
    println(cityGroup.length + "city:" + cityGroup.mkString(","))
    println(provinceGroup.length + "province:" + provinceGroup.mkString(","))
  }

}
